package com.dahuan.window;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Window_AggregateFunction {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从文件读取数据
//        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
//        DataStreamSource<String> stringDataStreamSource = env.readTextFile( path );

        DataStreamSource<String> localhost = env.socketTextStream( "localhost", 7777 );

        //输入类型是String 类型，返回什么类型就输出什么类型
        DataStream<SensorReading> dataStream = localhost.map( data ->{
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );


        SingleOutputStreamOperator<Integer> resultStream = dataStream.keyBy( "id" )
                .timeWindow( Time.seconds( 5 ) )
                //TODO 增量聚合累加器,增量聚合函数
                .aggregate( new AggregateFunction<SensorReading, Integer, Integer>() {
                    //TODO 创建初始累加器
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    //TODO 每次结果都累加
                    @Override
                    public Integer add(SensorReading value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    //TODO 返回当前结果
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    //TODO 合并两个累加器，以合并状态返回累加器。
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                } );

        resultStream.print();
        env.execute("Window_AggregateFunction");
    }
}
